#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import time

import testlib
from machine import testvm


class TestServices(testlib.MachineCase):

    def setUp(self):
        super().setUp()

        # Make sure the system finishes "booting" so that
        # when we add additional services to this target below
        # we don't race with the boot process
        self.machine.execute("while ! systemctl is-active default.target; do sleep 1; done")

        # We manipulate with `/etc/systemd/system` so `daemon-reload` to keep it in proper state
        # Also `reset-failed` as failed units can be still listed with `systemctl` even when all files
        # were removed and daemon reloaded.
        self.restore_dir("/etc/systemd/system", "systemctl daemon-reload; systemctl reset-failed")
        self.restore_dir("/etc/systemd/user")
        self.restore_dir("/etc/xdg/systemd/user")
        self.restore_dir("/home/admin/.config")

    def run_systemctl(self, user, cmd):
        if user:
            self.machine.execute(f"su - admin -c 'export XDG_RUNTIME_DIR=/run/user/$(id -u admin); systemctl --user {cmd}'")
        else:
            self.machine.execute(f"systemctl {cmd}")

    def pick_tab(self, n):
        self.browser.click(f'#services-filter li:nth-child({n}) a')

    def wait_onoff(self, val):
        self.browser.wait_visible(".service-top-panel .pf-v5-c-switch__input" + (":checked" if val else ":not(:checked)"))

    def toggle_onoff(self):
        self.browser.click(".service-top-panel .pf-v5-c-switch__input")
        # move away from the button again to remove the tooltip; it covers the kebab
        self.browser.mouse(".service-top-panel .pf-v5-c-switch__input", "mouseleave")
        self.browser.wait_not_present("[role='tooltip']")

    def do_action(self, action):
        self.browser.click(".service-top-panel button.pf-v5-c-menu-toggle")
        self.browser.click(f".service-top-panel .pf-v5-c-menu__list-item:contains('{action}') button")

    def check_service_details(self, statuses, actions, enabled, onoff=True, kebab=True):
        with self.browser.wait_timeout(60):
            self.browser.wait_collected_text("#statuses .status", "".join(statuses))
        if onoff:
            self.wait_onoff(val=enabled)
        else:
            self.browser.wait_not_present(".service-top-panel .pf-v5-c-switch")

        if kebab:
            self.browser.click(".service-top-panel button.pf-v5-c-menu-toggle")
            self.browser.wait_text(".service-top-panel .pf-v5-c-menu__list", "".join(actions))
            # Click away to close the pf-v5-c-menu__list
            self.browser.click(".service-top-panel button.pf-v5-c-menu-toggle")
        else:
            self.browser.wait_not_present(".service-top-panel .pf-v5-c-menu__list")

    def check_service_on(self, expect_reload=True):
        self.check_service_details(
            ["Running", "Automatically starts"],
            ["Reload" if expect_reload else "", "Restart", "Stop", "Disallow running (mask)", "Pin unit"],
            enabled=True)

    def check_service_off(self):
        self.check_service_details(["Disabled"],
                                   ["Start", "Disallow running (mask)", "Pin unit"],
                                   enabled=False)

    def svc_sel(self, service):
        return f'tr[data-goto-unit="{service}"]'

    def goto_service(self, service):
        return self.browser.click(self.svc_sel(service) + ' a')

    def wait_service_state(self, service, state):
        state_new = state
        if 'inactive' in state:
            state_new = 'Not running'
        elif 'active' in state:
            state_new = 'Running'
        elif 'failed' in state:
            state_new = 'Failed to'

        self.browser.wait_in_text(self.svc_sel(service), state_new)

    def select_file_state(self, states):
        if not isinstance(states, list):
            states = [states]

        self.browser.click("#services-dropdown-file-state")
        for state in states:
            self.browser.set_checked(f".pf-v5-c-menu__item:contains('{state}') input", val=True)
        self.browser.click("#services-dropdown-file-state")
        for state in states:
            self.browser.wait_visible(f".pf-v5-c-chip-group__label:contains('File state') + .pf-v5-c-chip-group__list li:contains('{state}')")

    def select_active_state(self, states):
        if not isinstance(states, list):
            states = [states]

        self.browser.click("#services-dropdown-active-state")
        for state in states:
            self.browser.set_checked(f".pf-v5-c-menu__item:contains('{state}') input", val=True)
        self.browser.click("#services-dropdown-active-state")
        for state in states:
            self.browser.wait_visible(f".pf-v5-c-chip-group__label:contains('Active state') + .pf-v5-c-chip-group__list li:contains('{state}')")

    def wait_page_load(self):
        self.browser.wait_not_present(".pf-v5-c-empty-state .pf-v5-c-spinner[aria-valuetext='Loading...']")

    def wait_service_in_panel(self, service, title):
        self.wait_service_state(service, title)

    def wait_service_present(self, service):
        service_name = service[:-8] if service.endswith(".service") else service
        self.browser.wait_text(f'tr[data-goto-unit="{service}"] .service-unit-id', service_name)

    def wait_service_not_present(self, service):
        self.browser.wait_not_present(self.svc_sel(service))

    def make_test_service(self, path="/etc/systemd/system/"):
        self.write_file(f"{path}/test.service",
                        """
[Unit]
Description=Test Service

[Service]
ExecStart=/usr/local/bin/test-service
ExecReload=/bin/true

[Install]
WantedBy=default.target
""")
        self.write_file("/usr/local/bin/test-service",
                        """#!/bin/sh
set -eu
trap "echo STOP" 0

if [ $(id -u) -eq 0 ]; then
    journalctl --sync
fi

# increase the chance for journal to catch up
sleep 5

echo START
while true; do
  sleep 5
  echo WORKING
done
""", perm="755")
        # After writing files out tell systemd about them
        self.run_systemctl(user=False, cmd="daemon-reload")
        self.run_systemctl(user=True, cmd="daemon-reload || true")

        # When the test fails while `test.service` is active, the process keeps running and
        # `systemctl status test.service` still shows it as active until this process dies
        self.addCleanup(
            self.machine.execute,
            "for op in stop reset-failed disable; do systemctl $op test test-fail || true; done")
        self.addCleanup(
            self.machine.execute,
            "su - admin -c 'export XDG_RUNTIME_DIR=/run/user/$(id -u admin); "
            "for op in stop reset-failed disable; do systemctl --user $op test test-fail || true; done'")

    def testBasic(self):
        self._testBasic()

    def testBasicSession(self):
        self._testBasic(user=True)

    def _testBasic(self, user=False):
        m = self.machine
        b = self.browser

        path = "/etc/systemd/user" if user else "/etc/systemd/system"

        self.write_file(f"{path}/test-fail.service",
                        """
[Unit]
Description=Failing Test Service

[Service]
ExecStart=/usr/bin/false

[Install]
WantedBy=default.target
""")
        self.write_file(f"{path}/special@:-characters.service",
                        """
[Unit]
Description=Service With Special Characters in Id

[Service]
ExecStart=/usr/bin/true

[Install]
WantedBy=default.target
""")

        self.make_test_service(path)

        url = "/system/services#/?owner=user" if user else "/system/services"
        self.login_and_go(url, user="admin")
        self.wait_page_load()

        self.wait_service_present("test.service")
        self.wait_service_in_panel("test.service", "Disabled")
        self.wait_service_state("test.service", "inactive")

        self.run_systemctl(user, "start test.service")
        self.wait_service_state("test.service", "active")
        self.wait_service_in_panel("test.service", "Disabled")

        self.run_systemctl(user, "stop test.service")
        self.wait_service_state("test.service", "inactive")
        self.wait_service_in_panel("test.service", "Disabled")

        self.run_systemctl(user, "enable test.service")
        self.wait_service_state("test.service", "inactive")
        self.wait_service_in_panel("test.service", "Enabled")

        self.run_systemctl(user, "start test.service")

        b.wait_attr("#services-toolbar", "data-loading", "false")

        self.wait_service_present("special@:-characters.service")
        self.wait_service_in_panel("special@:-characters.service", "Disabled")
        self.wait_service_state("special@:-characters.service", "inactive")

        # Test breadcrumb link when service id contains special characters
        self.goto_service("special@:-characters.service")
        b.wait_in_text(".pf-v5-c-breadcrumb", "special@:-characters.service")
        b.click(".pf-v5-c-breadcrumb a:contains('Services')")
        b.wait_visible("#services-list")

        suffix = "?owner=user" if user else ''

        # Check empty state error for nonexistent unit with valid name
        b.go(f'/system/services#/nonexistent.service{suffix}')
        b.wait_in_text(".pf-v5-c-empty-state", "Unit nonexistent.service not found")
        b.click("button:contains('View all services')")
        b.switch_to_top()
        b.wait_js_cond('window.location.pathname == "/system/services"')
        if user:
            b.wait_js_cond(f"window.location.hash === '#/{suffix}'")

        # Check empty state error for invalid unit name
        b.go(f'/system/services#/nonexistent{suffix}')
        b.enter_page("/system/services")
        b.wait_in_text(".pf-v5-c-empty-state", "Loading unit failed")
        b.wait_in_text(".pf-v5-c-empty-state", "Unit name nonexistent is not valid")

        b.go(f"#/{suffix}")

        # Survives a burst of events
        self.wait_service_present("test.service")
        m.execute("udevadm trigger; udevadm settle")
        self.goto_service("test.service")
        self.check_service_on()

        # Stop and disable and back again
        self.toggle_onoff()
        self.check_service_off()
        self.toggle_onoff()
        self.check_service_on()
        self.toggle_onoff()  # later on we need some disabled test
        self.check_service_off()

        # Check service that fails to start
        b.go(f'/system/services#/{suffix}')
        self.goto_service("test-fail.service")
        self.check_service_off()
        self.toggle_onoff()
        self.check_service_details(["Failed to start", "Automatically starts"],
                                   ["Start", "Clear 'Failed to start'", "Disallow running (mask)", "Pin unit"],
                                   enabled=True)
        if not user:
            b.assert_pixels("#service-details-unit", "details-test-fail",
                            # in medium layout we sometimes get a scrollbar depending on how many test-fail logs exist
                            skip_layouts=["medium"],
                            # ignore the switcher, it causes a tiny flake around the sides.
                            ignore=[".pf-v5-c-switch__toggle"])
        b.click(".action-button:contains('Start service')")
        b.go(f'/system/services#/{suffix}')
        self.wait_service_present("test-fail.service")
        self.wait_service_state("test-fail.service", "failed")

        # Check static service
        self.goto_service("systemd-exit.service")
        self.check_service_details(["Static", "Not running"],
                                   ["Start", "Disallow running (mask)", "Pin unit"],
                                   enabled=True, onoff=False)
        # Check that journalbox shows empty state
        b.wait_text('.cockpit-log-panel .pf-v5-c-card__body', "No log entries")
        b.wait_not_present("button:contains('View all logs')")

        # Mask and unmask
        self.do_action("Disallow running (mask)")
        b.click("#mask-service button.pf-m-danger")
        self.check_service_details(["Masked"], ["Allow running (unmask)"], enabled=False, onoff=False)

        # Masked services have no relationships and therefore the expandable section should not be present
        b.wait_not_present("#service-details-show-relationships")

        self.do_action("Allow running (unmask)")
        self.check_service_details(["Static", "Not running"],
                                   ["Start", "Disallow running (mask)", "Pin unit"],
                                   enabled=True, onoff=False)

        # Pin unit
        b.go(f'/system/services#/{suffix}')
        b.wait_not_present('#test.service > svg.service-thumbtack-icon-color')

        self.goto_service("test.service")
        self.check_service_details(["Disabled"], ["Start", "Disallow running (mask)", "Pin unit"], enabled=False)
        b.wait_not_present('#service-details-unit > article > div > svg.service-thumbtack-icon')
        self.do_action("Pin unit")
        b.is_present('#service-details-unit > article > div > svg.service-thumbtack-icon')
        b.go(f'/system/services#/{suffix}')

        # returns index of first unit that isn't failed
        b.eval_js("""
                    function firstWorkingUnitPos() {
                        const services = document.getElementById('services-list');
                        for (let i = 0; i < services.childElementCount; i++) {
                            const tbody = services.children[i];
                            if (!tbody.firstChild.classList.contains('service-unit-failed'))
                                return i;
                        }
                    }
                  """)
        b.wait_visible("#services-list")
        pos = b.eval_js('firstWorkingUnitPos();')
        b.wait_text(f'#services-list > tbody:nth-child({pos + 1}) > tr > th > div > div > a', 'test')
        b.is_present('#test.service > svg.service-thumbtack-icon-color')

        # Unpin unit
        self.goto_service("test.service")
        self.check_service_details(["Disabled"], ["Start", "Disallow running (mask)", "Unpin unit"], enabled=False)
        self.do_action("Unpin unit")
        b.wait_not_present('#service-details-unit > article > div > svg.service-thumbtack-icon')
        self.check_service_off()
        b.go(f'/system/services#/{suffix}')
        b.wait_not_present('#test.service > svg.service-thumbtack-icon-color')

    @testlib.nondestructive
    def testFilter(self):
        b = self.browser

        self.write_file("/run/systemd/system/test-fail.service",
                        """
[Unit]
Description=Failing Test Service

[Service]
ExecStart=/usr/bin/false

[Install]
WantedBy=default.target
""")
        self.make_test_service("/etc/systemd/system")
        self.machine.execute("systemctl enable --now test-fail.service")

        def init_filter_state():
            if not b.is_visible("#services-text-filter"):
                b.click(".pf-v5-c-toolbar__toggle button")

            if b.is_present(".pf-v5-c-toolbar__expandable-content.pf-m-expanded button:contains('Clear all filters')"):
                b.click(".pf-v5-c-toolbar__expandable-content.pf-m-expanded button:contains('Clear all filters')")
            elif b.is_present(".pf-v5-c-toolbar__content > .pf-v5-c-toolbar__item > button:contains('Clear all filters')"):
                b.click(".pf-v5-c-toolbar__content > .pf-v5-c-toolbar__item > button:contains('Clear all filters')")
            else:
                b.set_input_text("#services-text-filter input", "")

            self.wait_service_present("test.service")
            self.wait_service_present("test-fail.service")

        self.login_and_go('/system/services')
        self.wait_service_present("test-fail.service")
        self.wait_service_state("test-fail.service", "failed")

        # Filter by id
        init_filter_state()
        b.set_input_text("#services-text-filter input", "fail.ser")
        self.wait_service_not_present("test.service")
        self.wait_service_present("test-fail.service")

        # Filter by description capital letters included
        init_filter_state()
        b.set_input_text("#services-text-filter input", "Test Service")
        # test.service is not loaded, thus description search does not find it
        self.wait_service_present("test-fail.service")
        b.assert_pixels("#services-page", "text-filter-test", skip_layouts=["mobile"])

        if b.pixels_label:
            b.set_layout("mobile")
            # HACK: reload for PF 5.4.0 to detect the new layout and breakpoints
            # https://github.com/patternfly/patternfly-react/issues/10897
            b.reload()
            b.enter_page("/system/services")
            # Waiting a bit for the layout to stabilize.  The scrolling of
            # the header happens asynchronously with an animation.
            time.sleep(2)
            # Now scroll the header all the way to the left to get a conistent pixel test result
            nav_scroll_btn = ".services-header button[aria-label='Scroll back']"
            while b.call_js_func("ph_attr", nav_scroll_btn, "disabled") is None:
                b.click(nav_scroll_btn)
                time.sleep(0.5)
            b.assert_pixels_in_current_layout("#services-page", "text-filter-test")
            b.set_layout("desktop")
            self.wait_page_load()

        # Filter by description capitalization not matching the unit description
        init_filter_state()
        b.set_input_text("#services-text-filter input", "failing test service")
        self.wait_service_not_present("test.service")
        self.wait_service_present("test-fail.service")

        # Filter by Id capitalization not matching the unit Id
        init_filter_state()
        b.set_input_text("#services-text-filter input", "networkmanager")
        self.wait_service_present("NetworkManager.service")

        # Select only static services
        init_filter_state()
        self.select_file_state("Static")
        self.wait_service_not_present("test.service")
        self.wait_service_not_present("test-fail.service")
        self.wait_service_present("systemd-exit.service")

        # Select only disabled services
        init_filter_state()
        self.select_file_state("Disabled")
        self.wait_service_present("test.service")
        self.wait_service_not_present("test-fail.service")
        self.wait_service_not_present("systemd-exit.service")

        # Select only stopped services
        init_filter_state()
        self.select_active_state("Not running")
        self.wait_service_present("test.service")
        self.wait_service_not_present("test-fail.service")

        # Select only failed services
        init_filter_state()
        self.select_active_state("Failed to start")
        self.wait_service_not_present("test.service")
        self.wait_service_present("test-fail.service")

        # Select Alias and Masked services
        self.machine.execute("systemctl mask test-fail.service")
        self.wait_service_in_panel("test-fail.service", "Masked")
        init_filter_state()
        self.select_file_state(["Indirect", "Masked"])
        self.wait_service_not_present("test.service")
        self.wait_service_present("test-fail.service")
        self.wait_service_present("getty@tty1.service")

        # Check filtering and selecting together - empty state
        init_filter_state()
        b.set_input_text("#services-text-filter input", "failing")
        self.select_active_state("Not running")
        self.wait_service_not_present("test.service")
        b.wait_visible("#services-page .pf-v5-c-empty-state")

        # Check resetting filter
        b.click("#clear-all-filters")
        self.wait_service_present("test.service")
        self.wait_service_present("test-fail.service")
        self.wait_service_present("systemd-exit.service")
        self.assertEqual(b.val("#services-text-filter input"), "")

        # Check that closing filter chip groups or single chips works
        init_filter_state()
        self.select_active_state("Not running")
        self.select_active_state("Running")
        self.wait_service_present("test.service")
        b.click(".pf-v5-c-chip-group__label:contains('Active state') + .pf-v5-c-chip-group__list li:contains('Not running') button")
        b.wait_not_present(".pf-v5-c-chip-group__label:contains('Active state') + .pf-v5-c-chip-group__list li:contains('Not running')")
        self.wait_service_not_present("test.service")
        b.click(".pf-v5-c-chip-group:contains('Active state') .pf-v5-c-chip-group__close > button")
        b.wait_not_present(".pf-v5-c-chip-group")

    @testlib.nondestructive
    def testTimer(self):
        self._testTimer(user=False)

    @testlib.nondestructive
    def testTimerSession(self):
        self._testTimer(user=True)

    def _testTimer(self, user):
        m = self.machine
        b = self.browser

        path = "/etc/systemd/user" if user else "/etc/systemd/system"
        self.write_file(f"{path}/test.timer",
                        """
[Unit]
Description=Test Timer

[Timer]
OnCalendar=*:1/2
""")
        self.write_file(f"{path}/test-onboot.timer",
                        """
[Unit]
Description=Test OnBoot Timer

[Timer]
OnBootSec=200min
Unit=test.service
""")
        self.write_file("/usr/local/lib/systemd/system/cockpit-system.timer",
                        """
[Unit]
Description=Not deleteable Timer

[Timer]
OnCalendar=*:1/2
""")
        self.write_file(f"{path}/deleteme.service",
                        """
[Unit]
Description=Delete Me Service

[Service]
ExecStart=/usr/bin/true

[Install]
WantedBy=default.target
""")
        self.write_file(f"{path}/deleteme.timer",
                        """
[Unit]
Description=Delete Me Timer

[Timer]
OnCalendar=*:1/2

# just for user mode
[Install]
WantedBy=default.target
""")
        self.make_test_service(path)

        # ensure we have one running timer; in user mode we do not have
        # a running logind session at this point yet, so enable instead
        self.run_systemctl(user=False, cmd=("enable --global --runtime" if user else "start") + " deleteme.timer")
        self.addCleanup(self.run_systemctl, user, "stop deleteme.timer || true")

        # Select Timer tab
        self.login_and_go(f'/system/services#/{"?owner=user" if user else ""}')
        self.pick_tab(4)
        b.wait_not_in_text("#services-list", "test-fail")
        b.wait_not_in_text("#services-list", ".target")
        b.wait_not_in_text("#services-list", ".socket")
        b.wait_visible(self.svc_sel('test.timer'))
        b.wait_text(self.svc_sel('test.timer') + ' .service-unit-triggers', '')
        today = b.eval_js("Intl.DateTimeFormat('en', { dateStyle: 'medium' }).format()")
        # timer from initial page/units load
        b.wait_in_text(self.svc_sel('deleteme.timer') + ' .service-unit-next-trigger', today)
        # timer gets updated on PropertiesChanged event
        self.run_systemctl(user, "start test.timer")
        b.wait_in_text(self.svc_sel('test.timer') + ' .service-unit-next-trigger', today)  # next run
        b.wait_in_text(self.svc_sel('test.timer') + ' .service-unit-last-trigger', "unknown")  # last trigger

        # Timer details should also show information about next and last trigger
        self.goto_service("test.timer")
        b.wait_in_text('.service-unit-next-trigger', today)  # next run
        b.wait_in_text('.service-unit-last-trigger', "unknown")  # last trigger
        b.click(".pf-v5-c-breadcrumb a:contains('Services')")

        self.run_systemctl(user, "stop test.timer")

        b.wait_visible(self.svc_sel('test-onboot.timer'))
        b.wait_text(self.svc_sel('test-onboot.timer') + ' .service-unit-triggers', '')
        self.run_systemctl(user, "start test-onboot.timer")
        # Check the next run. Since it triggers 200mins after the boot, it might be today or tomorrow
        # this is too racy to predict accurately
        today = m.execute("date '+%b %-d, %Y'").strip()
        tomorrow = m.execute("date --date tomorrow '+%b %-d, %Y'").strip()
        sel_next = self.svc_sel('test-onboot.timer') + ' .service-unit-next-trigger'
        b.wait_in_text(sel_next, ", ")
        self.assertRegex(b.text(sel_next), f"{today}|{tomorrow}")
        b.wait_in_text(self.svc_sel('test-onboot.timer') + ' .service-unit-last-trigger', "unknown")  # last trigger
        self.run_systemctl(user, "stop test-onboot.timer")

        if not user:
            self.goto_service("deleteme.timer")
            self.do_action("Delete")
            b.click("#delete-timer-modal-btn")
            b.wait_not_present(".pf-v5-c-modal-box")
            self.wait_page_load()
            self.assertNotIn("deleteme", m.execute("systemctl list-timers"))
            m.execute(f"! test -f {path}/deleteme.service")
            m.execute(f"! test -f {path}/deleteme.timer")

            # system timers are not allowed to be deleted
            self.goto_service("cockpit-system.timer")
            # Check our custom kebab menu in RTL layout separately as the tests switch to RTL layout by editing
            # html directly while in a real life scenario we would reload the page. This circumvents the PF dropdown
            # from rendering in the wrong place when making a pixel test.
            b.set_layout("rtl")
            # wait for layout to settle
            time.sleep(1)
            b.click(".service-top-panel button.pf-v5-c-menu-toggle")
            b.wait_in_text(".service-top-panel .pf-v5-c-menu__list", "Disallow running")
            b.assert_pixels_in_current_layout("#service-details-unit", "services-kebabdropdown-rtl")
            # Close menu
            b.click(".service-top-panel button.pf-v5-c-menu-toggle")
            b.wait_not_present(".pf-v5-c-menu__list")
            b.set_layout("desktop")
            b.click(".service-top-panel button.pf-v5-c-menu-toggle")
            b.wait_in_text(".service-top-panel .pf-v5-c-menu__list", "Disallow running")
            # check if our custom kebab menu renders properly
            b.assert_pixels("#service-details-unit", "services-kebabdropdown", skip_layouts=['rtl'])
            b.wait_not_in_text(".service-top-panel .pf-v5-c-menu__list", "Delete")
            b.click(".pf-v5-c-breadcrumb a:contains('Services')")

    @testlib.nondestructive
    def testServiceMetrics(self):
        self._testServiceMetrics(user=False)

    @testlib.nondestructive
    def testServiceMetricsSession(self):
        self._testServiceMetrics(user=True)

    def _testServiceMetrics(self, user):
        m = self.machine
        b = self.browser

        self.login_and_go(f'/system/services#/{"?owner=user" if user else ""}')

        params = "-user" if user else ""
        path = "/etc/systemd/user" if user else "/etc/systemd/system"

        self.write_file(f"/tmp/mem-hog{params}.awk", f'BEGIN {{ system("while [ ! -f /tmp/continue{params} ]; do sleep 1; done"); y = sprintf("%50000000s",""); system("sleep infinity") }}')
        self.write_file(f"{path}/mem-hog.service",
                        f"""
[Unit]
Description=Mem Hog Service

[Service]
ExecStart=/bin/awk -f /tmp/mem-hog{params}.awk
""")

        self.run_systemctl(user, "daemon-reload")

        # Check that MemoryCurrent is shown
        self.goto_service("mem-hog.service")
        b.wait_not_present("#memory")
        # In some distros systemctl is showing memory current as [Not Set] for user units
        if not user or not m.image.startswith("rhel-8-"):
            self.run_systemctl(user, "start mem-hog.service")
            # If the test fails before we stop mem-hog, the next test run will not get the correct memory usage here
            self.addCleanup(self.run_systemctl, user, "stop mem-hog.service || true")
            # initial memory detection takes very long especially on RHEL 8
            with b.wait_timeout(60):
                b.wait_visible("#memory")
            initial_memory = float(b.text("#memory").split(" ")[0])
            self.assertGreater(initial_memory, 0.5)
            m.execute(f"touch /tmp/continue{params}")
            self.addCleanup(m.execute, f"rm /tmp/continue{params}")
            # MemoryCurrent auto-updates every 30s - when it updates the memory used should be ~50MiB
            with b.wait_timeout(60):
                b.wait(lambda: float(b.text("#memory").split(" ")[0]) > 40)
            self.run_systemctl(user, "stop mem-hog.service")
            b.wait_not_present("#memory")
        # Check that listen is not shown for .service units
        b.wait_not_present("#listen")

    @testlib.nondestructive
    def testOtherTypes(self):
        self._testOtherTypes(user=False)

    @testlib.nondestructive
    def testOtherTypesSession(self):
        self._testOtherTypes(user=True)

    def _testOtherTypes(self, user):
        b = self.browser

        self.login_and_go(f'/system/services#/{"?owner=user" if user else ""}')

        path = "/etc/systemd/user" if user else "/etc/systemd/system"
        self.make_test_service(path)

        # Targets tab
        self.pick_tab(2)
        b.wait_not_in_text("#services-list", "test")
        self.wait_service_state("basic.target", "active")

        if not user:
            # Make sure that symlinked services also appear in the list
            self.wait_service_state("reboot.target", "inactive")
            self.wait_service_present("ctrl-alt-del.target")
            self.goto_service("ctrl-alt-del.target")
            b.wait_in_text(".service-name", "Reboot")
            b.click("#services-page .pf-v5-c-breadcrumb__link")

        # Sockets tab
        self.pick_tab(3)
        b.wait_not_in_text("#services-list", "test")
        b.wait_not_in_text("#services-list", ".target")
        self.wait_service_state("dbus.socket", "active (running)")

        # Check that `Listen` and `Triggers` properties are shown for socket units
        self.goto_service("dbus.socket")
        if not user:
            self.assertIn("/run/dbus/system_bus_socket (Stream)", b.text("#listen"))
        else:
            self.assertIn("/run/user/", b.text("#listen"))
        self.assertIn(b.text("#Triggers"), ["dbus.service", "dbus-broker.service"])
        b.click(".pf-v5-c-breadcrumb a:contains('Services')")

        # Paths tab - the test VM does not have any user path units
        if not user:
            self.pick_tab(5)
            b.wait_not_in_text("#services-list", "test")
            b.wait_not_in_text("#services-list", ".target")
            b.wait_not_in_text("#services-list", ".socket")
            b.wait_not_in_text("#services-list", ".timer")
            b.wait_visible(self.svc_sel("systemd-ask-password-console.path"))

    def testLogs(self):
        self._testLogs(user=False)

    def testLogsUser(self):
        self._testLogs(user=True)

    def _testLogs(self, user=False):
        b = self.browser
        service_type = "user-service" if user else "service"

        def append_user(url):
            if user:
                return url + ("" if "#/" in url else "#/") + "?owner=user"
            return url

        self.make_test_service("/etc/systemd/user" if user else "/etc/systemd/system")

        self.login_and_go(append_user("/system/services"))
        self.wait_page_load()

        # Check test.service and then start it
        self.goto_service("test.service")
        self.check_service_off()
        b.wait_text('.cockpit-log-panel .pf-v5-c-card__body', "No log entries")
        self.toggle_onoff()
        self.check_service_on()
        b.wait_in_text('.cockpit-log-panel .pf-v5-c-card__body', "START")
        b.wait_in_text('.cockpit-log-panel .pf-v5-c-card__body', "WORKING")
        b.click(".cockpit-log-panel .pf-v5-c-card__header button")

        b.enter_page("/system/logs")
        b.wait_in_text("#journal-box .cockpit-log-panel > div:nth-child(2)", "WORKING")
        t = b.text("#journal-box .cockpit-log-panel > div:nth-child(3)") + b.text("#journal-box .cockpit-log-panel > div:nth-child(4)")
        self.assertIn("START", t)
        b.wait_val("#journal-grep input", f"priority:debug {service_type}:test.service ")
        b.go(append_user("/system/services#/test.service"))
        b.enter_page("/system/services")

        b.wait_in_text(".cockpit-logline:nth-child(2)", "WORKING")
        b.click(".cockpit-logline:nth-child(2)")
        b.enter_page("/system/logs")
        b.wait_text(".pf-v5-c-card__title", "WORKING")
        b.click(".pf-v5-c-breadcrumb li:contains('Logs')")
        b.wait_val("#journal-grep input", f"priority:debug {service_type}:test.service ")
        b.go(append_user("/system/services#/test.service"))
        b.enter_page("/system/services")

        b.wait_in_text(".cockpit-logline:nth-child(2)", "WORKING")
        b.click(".cockpit-logline:nth-child(2)")
        b.enter_page("/system/logs")
        b.click("button:contains('Go to test.service')")
        b.enter_page("/system/services")
        b.wait_js_cond('window.location.hash === "' + append_user("#/test.service") + '"')

    @testlib.nondestructive
    def testApi(self):
        b = self.browser

        self.make_test_service()

        self.login_and_go("/playground/service#/test")

        b.wait_text('#exists', 'true')
        b.wait_text('#state', '"stopped"')
        b.wait_text('#enabled', 'false')

        b.click('#start')
        b.wait_text('#state', '"running"')
        b.click('#stop')
        b.wait_text('#state', '"stopped"')

        b.click('#enable')
        b.wait_text('#enabled', 'true')
        b.click('#disable')
        b.wait_text('#enabled', 'false')

        b.go('#/foo')
        b.wait_text('#exists', 'false')

    @testlib.nondestructive
    def testConditions(self):
        m = self.machine
        b = self.browser
        self.write_file("/etc/systemd/system/condtest.service",
                        """
[Unit]
Description=Test Service
ConditionDirectoryNotEmpty=/var/tmp/empty

[Service]
ExecStart=/bin/sh -c "while true; do sleep 5; done"

[Install]
WantedBy=multi-user.target
""")

        m.execute("mkdir -p /var/tmp/empty")
        m.execute("rm -rf /var/tmp/empty/*")
        self.addCleanup(m.execute, "systemctl stop condtest")

        # After writing files out tell systemd about them
        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start multi-user.target")

        # This does not work for not enabled services. See:
        # https://github.com/systemd/systemd/issues/2234
        self.machine.execute("systemctl enable condtest")

        self.login_and_go("/system/services")

        # Selects Services tab
        self.pick_tab(1)

        self.wait_service_in_panel("condtest.service", "Enabled")
        self.goto_service("condtest.service")
        self.check_service_details(["Not running", "Automatically starts"],
                                   ["Start", "Disallow running (mask)", "Pin unit"],
                                   enabled=True)
        self.do_action("Start")
        b.wait_text("#condition", "Condition ConditionDirectoryNotEmpty=/var/tmp/empty was not met")

        # If the condition succeeds the message disappears
        m.execute("touch /var/tmp/empty/non-empty")
        self.addCleanup(m.execute, "rm /var/tmp/empty/non-empty")
        self.do_action("Start")
        self.check_service_on(expect_reload=False)
        b.wait_not_present("#condition")

    @testlib.nondestructive
    def testRelationships(self):
        self._testRelationships()

    @testlib.nondestructive
    def testRelationshipsUser(self):
        self._testRelationships(user=True)

    def _testRelationships(self, user=False):
        b = self.browser

        systemd_path = "/etc/systemd/user" if user else "/etc/systemd/system"

        self.write_file(f"{systemd_path}/test-a.service",
                        """
[Unit]
Description=A Service
Before=test-b.service
Conflicts=test-c.service

[Service]
ExecStart=/bin/sh -c "while true; do sleep 5; done"
""")

        self.write_file(f"{systemd_path}/test-b.service",
                        """
[Unit]
Description=B Service
After=test-a.service

[Service]
ExecStart=/bin/sh -c "while true; do sleep 5; done"
""")

        self.write_file(f"{systemd_path}/test-c.service",
                        """
[Unit]
Description=C Service
Conflicts=test-a.service
PartOf=test-b.service

[Service]
ExecStart=/bin/sh -c "while true; do sleep 5; done"
""")

        # After writing files out tell systemd about them
        self.run_systemctl(user, "daemon-reload || true")

        def rel_sel(reltype, service):
            return f"#{''.join(reltype.split(' '))} a:contains('{service}')"

        # services page
        url = "/system/services#/?owner=user" if user else "/system/services"
        self.login_and_go(url, user="admin")
        self.wait_page_load()

        self.wait_service_present("test-a.service")
        self.goto_service("test-a.service")

        usr_query_str = "?owner=user" if user else ""

        # service a
        b.wait_js_cond(f'window.location.hash === "#/test-a.service{usr_query_str}"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("Before", "test-b.service"))
        b.wait_visible(rel_sel("Conflicts", "test-c.service"))
        b.click(rel_sel("Before", "test-b.service"))

        # service b
        b.wait_js_cond(f'window.location.hash === "#/test-b.service{usr_query_str}"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("After", "test-a.service"))
        b.click(rel_sel("After", "test-a.service"))

        # service a
        b.wait_js_cond(f'window.location.hash === "#/test-a.service{usr_query_str}"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("Conflicts", "test-c.service"))
        b.click(rel_sel("Conflicts", "test-c.service"))

        # service c
        b.wait_js_cond(f'window.location.hash === "#/test-c.service{usr_query_str}"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("Conflicts", "test-a.service"))
        b.wait_visible(rel_sel("Part of", "test-b.service"))
        b.click(rel_sel("Part of", "test-b.service"))

        # service b
        b.wait_js_cond(f'window.location.hash === "#/test-b.service{usr_query_str}"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("After", "test-a.service"))

    @testlib.nondestructive
    def testNotFound(self):
        m = self.machine
        b = self.browser

        self.write_file("/etc/systemd/system/test.service",
                        """
[Unit]
Description=Test Service
Requires=not-found.service

[Service]
ExecStart=/bin/true
""")
        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start default.target")

        self.login_and_go("/system/services")
        self.wait_page_load()

        b.wait_visible(self.svc_sel("test.service"))
        b.wait_not_present(self.svc_sel("not-found.service"))

        self.goto_service("test.service")
        b.wait_js_cond('window.location.hash === "#/test.service"')

        b.click("#service-details-show-relationships button")
        b.wait_visible("#Requires a.pf-m-disabled:contains('not-found.service')")

    @testlib.nondestructive
    def testResetFailed(self):
        m = self.machine
        b = self.browser

        # Put it in /run instead of in /etc so that we can mask it,
        # which needs to put a symlink into /etc.
        self.write_file("/run/systemd/system/test-fail.service",
                        """
[Unit]
Description=Failing Test Service

[Service]
ExecStart=/usr/bin/false

[Install]
WantedBy=default.target
""")

        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start default.target")

        self.login_and_go("/system/services")
        self.wait_page_load()

        self.goto_service("test-fail.service")
        b.wait_js_cond('window.location.hash === "#/test-fail.service"')

        self.check_service_off()

        self.wait_onoff(val=False)
        self.toggle_onoff()

        self.check_service_details(["Failed to start", "Automatically starts"],
                                   ["Start", "Clear 'Failed to start'", "Disallow running (mask)", "Pin unit"], enabled=True)

        # Disabling should reset the "Failed to start" status
        self.wait_onoff(val=True)
        self.toggle_onoff()
        self.check_service_off()

        self.wait_onoff(val=False)
        self.toggle_onoff()

        self.check_service_details(["Failed to start", "Automatically starts"],
                                   ["Start", "Clear 'Failed to start'", "Disallow running (mask)", "Pin unit"], enabled=True)

        # Just reset the failed status, but leave it enabled
        self.do_action("Clear")

        self.check_service_details(["Not running", "Automatically starts"],
                                   ["Start", "Disallow running (mask)", "Pin unit"], enabled=True)

        self.do_action("Start")
        self.check_service_details(["Failed to start", "Automatically starts"],
                                   ["Start", "Clear 'Failed to start'", "Disallow running (mask)", "Pin unit"], enabled=True)

        # Masking should also clear the failed status
        self.do_action("Disallow running (mask)")
        b.click("#mask-service button.pf-m-danger")
        self.check_service_details(["Masked"], ["Allow running (unmask)"], enabled=False, onoff=False)

    def count_failures(self):
        return int(self.machine.execute("systemctl --failed --plain --no-legend | wc -l"))

    def check_system_menu_services_error(self, expected, pixel_label=None):
        b = self.browser
        b.switch_to_top()
        if expected:
            b.wait_visible("#services-error")
        else:
            b.wait_not_present("#services-error")

        if pixel_label:
            b.assert_pixels("#nav-system", pixel_label, skip_layouts=["mobile"])
            b.set_layout("mobile")
            b.click("#nav-system-item")
            b.assert_pixels_in_current_layout("#nav-system", pixel_label)
            b.click("#nav-system-item")
            b.set_layout("desktop")

    @testlib.nondestructive
    def testNotifyFailed(self):
        m = self.machine
        b = self.browser

        # We use two services with prefix "aaa" and "aab" so that
        # they always occupy the first two rows and we can check their
        # relative order depending on whether "aab" has failed or not.

        self.write_file("/etc/systemd/system/aaa.service",
                        """
[Unit]
Description=First Row Service

[Service]
ExecStart=/usr/bin/true
""")
        self.write_file("/etc/systemd/system/aab-fail.service",
                        """
[Unit]
Description=Failing Test Service

[Service]
ExecStart=/usr/bin/false
""")

        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start default.target")
        self.machine.execute("systemctl reset-failed")

        self.login_and_go("/system/services")
        self.wait_page_load()

        # Start test-fail
        m.execute("systemctl start aab-fail")

        # Services tab should have icon
        # Aab-fail should be at the top, and have failed
        b.wait_visible('a:contains("Services") .ct-exclamation-circle')
        b.wait_visible('tr[data-goto-unit="aab-fail.service"]')
        b.wait_visible('tr[data-goto-unit="aab-fail.service"] .service-unit-status:contains("Failed")')

        # Nav link should have icon
        self.check_system_menu_services_error(expected=True, pixel_label="menu_error")

        n_failures = self.count_failures()
        health_message = "1 service has failed" if n_failures == 1 else f"{n_failures} services have failed"

        # System page should have notification
        b.click_system_menu("/system")
        b.wait_in_text(".system-health-events", health_message)

        # Clear aab-fail
        m.execute("systemctl reset-failed aab-fail")

        # aab-fail should not be at the top
        b.switch_to_top()
        b.click_system_menu("/system/services")
        b.wait_not_present('li:nth-child(1)[data-goto-unit="aab-fail.service"]')

        if self.count_failures() == 0:
            # Services tab should not have icon
            b.wait_not_present('a:contains("Services") .ct-exclamation-circle')

            # Nav link should not have icon
            self.check_system_menu_services_error(expected=False)

            # System page should not have notification
            b.click_system_menu('/system')
            b.wait_not_in_text(".system-health-events", "service has failed")
            b.wait_not_in_text(".system-health-events", "services have failed")

    @testlib.nondestructive
    def testHiddenFailure(self):
        m = self.machine
        b = self.browser

        self.write_file("/etc/systemd/system/fail.mount",
                        """
[Unit]
Description=Failing Mount

[Mount]
What=wrong
Where=/fail
""")

        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start default.target")
        self.machine.execute("systemctl reset-failed")
        m.execute("systemctl start fail.mount || true")

        self.login_and_go("/system/services")
        self.wait_page_load()

        if self.count_failures() == 1:
            # Nav link should not have icon
            self.check_system_menu_services_error(expected=False)

            # System page should not have notification
            b.click_system_menu("/system")
            b.wait_not_in_text(".system-health-events", "service has failed")
            b.wait_not_in_text(".system-health-events", "services have failed")

        self.allow_journal_messages(".*type=1400 audit(.*): avc:  denied  { create } .* comm=\"systemd\" name=\"fail\".*")

    @testlib.nondestructive
    def testTransientUnits(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/system/services")
        self.wait_page_load()

        m.execute("systemd-run --collect --unit test-autocollect@1.service sh -c 'sleep 5; false'")
        m.execute("systemd-run --unit test-manual-collect@1.service sh -c 'sleep 5; false'")

        self.wait_service_present("test-autocollect@1.service")
        self.wait_service_present("test-manual-collect@1.service")

        self.wait_service_in_panel("test-manual-collect@1.service", "Failed to start")

        self.wait_service_not_present("test-autocollect@1.service")
        self.wait_service_present("test-manual-collect@1.service")

        m.execute("systemctl reset-failed")
        self.wait_service_not_present("test-manual-collect@1.service")

        # details page handles units going away
        m.execute("systemd-run --collect --unit test-transient.service sleep infinity")
        self.addCleanup(m.execute, "systemctl stop test-transient.service || true")
        self.wait_service_present("test-transient.service")
        self.goto_service("test-transient.service")
        self.check_service_details(["Static", "Running"], ["Restart", "Stop", "Disallow running (mask)", "Pin unit"], enabled=False, onoff=False)

        self.do_action("Stop")

        b.wait_in_text(".pf-v5-c-empty-state", "Unit test-transient.service not found")
        b.click("button:contains('View all services')")
        b.switch_to_top()
        b.wait_js_cond('window.location.pathname == "/system/services"')

    @testlib.nondestructive
    def testServicesThemeConsistency(self):
        b = self.browser

        self.login_and_go("/system/services")
        self.wait_page_load()

        b.wait_not_present("html.pf-v5-theme-dark")
        b.switch_to_top()
        b.open_superuser_dialog()
        b.click(".pf-v5-c-modal-box:contains('Switch to limited access') button:contains('Limit access')")
        b.check_superuser_indicator("Limited access")
        b.enter_page("/system/services")
        b.wait_not_present("html.pf-v5-theme-dark")

    @testlib.nondestructive
    def testServicesFiltersURLConsistency(self):
        b = self.browser

        # this filter matches nothing
        self.login_and_go("/system/services#/?activestate=[\"Running\"]&filestate=[\"Disabled\"%2C\"Enabled\"]&name=test&type=service&owner=user")
        self.wait_page_load()

        b.wait_visible(".pf-v5-c-chip-group:contains(Active state) li:contains(Running)")
        b.wait_visible(".pf-v5-c-chip-group:contains(File state) li:contains(Disabled)")
        b.wait_visible(".pf-v5-c-chip-group:contains(File state) li:contains(Enabled)")
        b.wait_visible(".services-text-filter input[value=test]")
        b.wait_visible(".pf-v5-c-nav__list a.pf-m-current:contains(Services)")
        b.wait_visible("#user.pf-m-selected")
        # there is no running "test" service
        self.browser.wait_in_text(".pf-v5-c-empty-state", "No matching results")
        # filter survives a reload, same URL
        b.reload()
        b.enter_page("/system/services")
        self.browser.wait_in_text(".pf-v5-c-empty-state", "No matching results")

        # Changing the UI filters should also affect the URL
        b.click("#system")
        url = b.eval_js("window.location.hash")
        self.assertIn("owner=system", url)

        # Mass remove active state filters and check the URL
        b.click(".pf-v5-c-chip-group:contains(Active state) .pf-v5-c-chip-group__close button")
        b.wait_js_cond('window.location.hash == "#/?filestate=%5B%22Disabled%22%2C%22Enabled%22%5D&name=test&type=service&owner=system"')

        b.click(".pf-v5-c-toolbar__content > .pf-v5-c-toolbar__item > button:contains('Clear all filters')")
        b.wait_js_cond('window.location.hash == "#/?type=service&owner=system"')

        # Test text input clear button
        b.go('/system/services#/?name=test')
        b.wait_visible(".services-text-filter input[value=test]")
        b.click(".services-text-filter button")
        b.wait_js_cond('window.location.hash == "#/"')

        # ensure that there is at least one running service
        self.make_test_service("/etc/systemd/user")
        self.run_systemctl(user=True, cmd="enable --now test.service")
        b.click("#user")
        self.wait_service_state("test.service", "active")
        self.wait_service_state("test.service", "Enabled")

        # drop the name search; all our images have at least stopped and static unit, which should not appear
        b.go("/system/services#/?activestate=[\"Running\"]&filestate=[\"Disabled\"%2C\"Enabled\"]&type=service&owner=user")
        self.wait_page_load()
        b.wait_in_text(".services-list", "Running")
        # filters out non-matching units
        self.assertNotIn("Not running", b.text(".services-list"))
        self.assertNotIn("Static", b.text(".services-list"))
        # survives a page reload
        b.reload()
        b.enter_page("/system/services")
        b.wait_in_text(".services-list", "Running")
        self.assertNotIn("Not running", b.text(".services-list"))
        self.assertNotIn("Static", b.text(".services-list"))

    @testlib.nondestructive
    def testAlias(self):
        m = self.machine
        b = self.browser
        self.write_file("/etc/systemd/system/flower-rose.service",
                        """
[Unit]
Description=Smell sweet

[Service]
ExecStart=/bin/echo Perfume
RemainAfterExit=yes

[Install]
Alias=flower-byanyothername.service
WantedBy=multi-user.target
""")
        m.execute("systemctl enable flower-rose.service")
        self.addCleanup(m.execute, "systemctl disable --now flower-rose.service")

        self.login_and_go("/system/services#/?name=flower")

        # overview: primary unit
        self.wait_service_present("flower-rose.service")
        self.wait_service_in_panel("flower-rose.service", "Enabled")
        self.wait_service_state("flower-rose.service", "inactive")

        # overview: alias
        self.wait_service_present("flower-byanyothername.service")
        if m.image.startswith("rhel-8"):
            # old systemd does not have unit file state "alias" yet
            self.wait_service_in_panel("flower-byanyothername.service", "Enabled")
        else:
            self.wait_service_in_panel("flower-byanyothername.service", "Alias")
        # runtime status of aliases is unknown in list view
        self.wait_service_state("flower-byanyothername.service", "")

        # both react to state changes
        m.execute("systemctl start flower-byanyothername.service")
        self.wait_service_state("flower-rose.service", "active")
        self.wait_service_state("flower-byanyothername.service", "")

        # details: primary unit
        self.goto_service("flower-rose.service")
        b.wait_in_text("#statuses", "Running")
        b.wait_in_text("#statuses", "Automatically starts")
        b.wait_in_text("#service-details-unit", "/etc/systemd/system/flower-rose.service")
        b.wait_in_text('.cockpit-log-panel .pf-v5-c-card__body', "Perfume")

        # details: alias
        b.click(".pf-v5-c-breadcrumb a:contains('Services')")
        self.goto_service("flower-byanyothername.service")
        b.wait_in_text("#statuses", "Running")
        b.wait_in_text("#statuses", "Automatically starts")
        b.wait_in_text("#service-details-unit", "/etc/systemd/system/flower-rose.service")
        b.wait_in_text('.cockpit-log-panel .pf-v5-c-card__body', "Perfume")

    @testlib.nondestructive
    def testUnprivileged(self):
        b = self.browser
        m = self.machine

        self.make_test_service()

        self.login_and_go("/system/services", superuser=False)
        self.wait_page_load()

        # service list
        self.wait_service_in_panel("NetworkManager.service", "Enabled")
        self.wait_service_state("NetworkManager.service", "active")
        self.wait_service_in_panel("test.service", "Disabled")
        self.wait_service_state("test.service", "inactive")
        # reacts to changes
        m.execute("systemctl start test.service")
        self.wait_service_state("test.service", "active")

        # details page
        self.goto_service("test.service")
        self.check_service_details(["Read-only", "Running"], [], enabled=False, onoff=False, kebab=False)
        b.wait_in_text("#service-details-unit", "/etc/systemd/system/test.service")
        # unprivileged users cannot read the system journal on all OSes; only check it on Fedora, where we know it's allowed
        if m.image.startswith("fedora"):
            b.wait_in_text('.cockpit-log-panel .pf-v5-c-card__body', "WORKING")
        # reacts to changes
        m.execute("systemctl stop test.service")
        self.check_service_details(["Read-only", "Disabled"], [], enabled=False, onoff=False, kebab=False)


class TestTimers(testlib.MachineCase):
    def svc_sel(self, service):
        return f'tr[data-goto-unit="{service}"]'

    def wait_page_load(self):
        self.browser.wait_not_present(".pf-v5-c-empty-state .pf-v5-c-spinner[aria-valuetext='Loading...']")

    def testCreate(self):
        m = self.machine
        b = self.browser

        def wait_systemctl_timer(time):
            with testvm.Timeout(seconds=20, machine=m, error_message="Timeout while waiting for systemctl list-timers"):
                m.execute(f"cmd='systemctl list-timers'; until $cmd | grep -m 1 '{time}'; do sleep 1; done")

        # HACK: pmie and pmlogger take a looooong time to shutdown (https://bugzilla.redhat.com/show_bug.cgi?id=1703348)
        # so disable them for this test, we don't test PCP here
        m.execute("systemctl disable --now pmie pmlogger || true")

        # set an initial baseline date/time, to ensure that we never jump backwards in subsequent tests
        m.execute("timedatectl set-timezone UTC")
        m.execute("""ntp=`timedatectl show --property NTP --value`
                     if [ $ntp == "yes" ]; then
                         timedatectl set-ntp off
                     fi""")
        testlib.wait(lambda: "false" in m.execute(
            "busctl get-property org.freedesktop.timedate1 /org/freedesktop/timedate1 org.freedesktop.timedate1 NTP"))
        m.execute("timedatectl set-time '2036-01-01 12:00:00'")
        # this sometimes confuses PAM session tracking
        self.allow_journal_messages("cockpit-session: admin: couldn't close session: System error: Unknown error -7")
        self.reboot()

        m.execute("timedatectl set-time '2036-01-01 15:30:00'")
        self.login_and_go("/system/services")
        self.wait_page_load()
        # Select "Timers" tab
        self.browser.click('#services-filter li:nth-child(4) a')
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "testing timer")
        m.execute("rm -f /tmp/date")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.set_input_text(".create-timer-time-picker input", "24:6s")
        b.click("#timer-save-button")

        # checks for invalid input messages
        b.wait_text("#servicename-helper", "Only alphabets, numbers, : , _ , . , @ , - are allowed")
        b.wait_text("#description-helper", "This field cannot be empty")
        b.wait_text(".pf-v5-c-date-picker__helper-text", "Invalid time format")

        # checks for command not found
        b.set_input_text("#servicename", "testing")
        b.set_input_text("#description", "desc")
        b.set_input_text("#command", "this is not found")
        b.set_input_text(".create-timer-time-picker input", "14:12")
        b.click("#timer-save-button")

        b.wait_text("#command-helper", "Command not found")

        # creates a new yearly timer at 2036-01-01 16:00 and at 2037-01-01 01:22
        b.set_input_text("#servicename", "yearly_timer")
        b.set_input_text("#description", "Yearly timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.select_from_dropdown("#drop-repeat", "yearly")
        b.click("[data-index='0'] [aria-label='Add']")
        b.set_input_text("[data-index='0'] .pf-v5-c-date-picker:nth-child(1) input", "2036-01-01")
        b.set_input_text("[data-index='0'] .pf-v5-c-date-picker:nth-child(2) input", "16:00")
        b.set_input_text("[data-index='1'] .pf-v5-c-date-picker:nth-child(1) input", "2037-01-01")
        b.set_input_text("[data-index='1'] .pf-v5-c-date-picker:nth-child(2) input", "01:22")

        # shows creation errors
        m.execute("chattr +i /etc/systemd/system")
        try:
            b.click("#timer-save-button")
            b.wait_in_text("#timer-dialog .pf-v5-c-alert", "Timer creation failed")
            b.wait_in_text("#timer-dialog .pf-v5-c-alert", "Not permitted")
        finally:
            m.execute("chattr -i /etc/systemd/system")

        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('yearly_timer.timer'))

        m.execute("timedatectl set-time '2036-01-01 15:30:00'")
        b.wait_visible(self.svc_sel('yearly_timer.timer'))
        b.wait_in_text(self.svc_sel('yearly_timer.timer'), "Yearly timer")
        wait_systemctl_timer("2036-01-01 16:00")
        self.assertIn("2036-01-01 16:00", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-01-01 16:10:00'")
        # checks if yearly timer repeats yearly on 2037-01-01 01:22
        wait_systemctl_timer("2037-01-01 01:22")
        self.assertIn("2037-01-01 01:22", m.execute("systemctl list-timers"))
        # creates a new monthly timer that runs on 6th at 14:12 and 8th at 21:12 of every month
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "monthly_timer")
        b.set_input_text("#description", "Monthly timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.select_from_dropdown("#drop-repeat", "monthly")
        b.select_from_dropdown("[data-index='0'] .month-days select", "6")
        b.set_input_text("[data-index='0'] .create-timer-time-picker input", "14:12")
        b.click("[data-index='0'] [aria-label='Add']")
        b.select_from_dropdown("[data-index='1'] .month-days select", "8")
        b.set_input_text("[data-index='1'] .create-timer-time-picker input", "21:12")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('monthly_timer.timer'))

        m.execute("timedatectl set-time '2036-01-01 16:15:00'")
        b.wait_visible(self.svc_sel('monthly_timer.timer'))
        b.wait_in_text(self.svc_sel('monthly_timer.timer'), "Monthly timer")
        wait_systemctl_timer("2036-01-06 14:12")
        self.assertIn("2036-01-06 14:12", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-01-07 00:00:00'")
        wait_systemctl_timer("2036-01-08 21:12")
        self.assertIn("2036-01-08 21:12", m.execute("systemctl list-timers"))
        # checks if timer runs on next month February 2036 on same dates
        m.execute("timedatectl set-time '2036-01-08 21:23'")
        wait_systemctl_timer("2036-02-06 14:12")
        self.assertIn("2036-02-06 14:12", m.execute("systemctl list-timers"))
        # checks if timer runs on 8th March 2036 at 21:12
        m.execute("timedatectl set-time '2036-03-07 00:00:00'")
        wait_systemctl_timer("2036-03-08 21:12")
        self.assertIn("2036-03-08 21:12", m.execute("systemctl list-timers"))
        # creates a new weekly timer that runs on Fri at 12:30 and Sun at 20:12 every week
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "weekly_timer")
        b.set_input_text("#description", "Weekly timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.select_from_dropdown("#drop-repeat", "weekly")
        b.wait_visible("[data-index='0'] .week-days")
        b.click("[data-index='0'] [aria-label='Add']")
        b.select_from_dropdown("[data-index='0'] .week-days select", "fri")
        # select time in TimePicker with mouse
        b.select_PF("#timer-dialog [data-index='0'] .create-timer-time-picker input", "12:30")
        # type time with keyboard
        b.select_from_dropdown("[data-index='1'] .week-days select", "sun")
        b.set_input_text("[data-index='1'] .create-timer-time-picker input", "20:12")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('weekly_timer.timer'))

        m.execute("timedatectl set-time '2036-03-08 00:00:00'")
        b.wait_visible(self.svc_sel('weekly_timer.timer'))
        b.wait_in_text(self.svc_sel('weekly_timer.timer'), "Weekly timer")
        wait_systemctl_timer("Sun 2036-03-09 20:12")
        self.assertIn("Sun 2036-03-09 20:12", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-03-10 00:00:00'")
        wait_systemctl_timer("Fri 2036-03-14 12:30")
        self.assertIn("Fri 2036-03-14 12:30", m.execute("systemctl list-timers"))
        # checks if timer runs on next week's Friday and Sunday
        m.execute("timedatectl set-time '2036-03-15 00:00:00'")
        wait_systemctl_timer("Sun 2036-03-16 20:12")
        self.assertIn("Sun 2036-03-16 20:12", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-03-17 00:00:00'")
        wait_systemctl_timer("Fri 2036-03-21 12:30")
        self.assertIn("Fri 2036-03-21 12:30", m.execute("systemctl list-timers"))
        # creates a new daily timer that runs at 2:40 and at 21:15 every day
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "daily_timer")
        b.set_input_text("#description", "Daily timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.select_from_dropdown("#drop-repeat", "daily")
        b.click("[data-index='0'] [aria-label='Add']")
        b.set_input_text("[data-index='0'] .create-timer-time-picker input", "2:40")
        b.set_input_text("[data-index='1'] .create-timer-time-picker input", "21:15")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('daily_timer.timer'))

        m.execute("timedatectl set-time '2036-03-17 00:00:00'")
        b.wait_visible(self.svc_sel('daily_timer.timer'))
        b.wait_in_text(self.svc_sel('daily_timer.timer'), "Daily timer")
        wait_systemctl_timer("2036-03-17 02:40")
        self.assertIn("2036-03-17 02:40", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-03-17 03:00:00'")
        wait_systemctl_timer("2036-03-17 21:15")
        self.assertIn("2036-03-17 21:15", m.execute("systemctl list-timers"))
        # checks if timer runs on 2036-04-10 at 02:40 and 21:15
        m.execute("timedatectl set-time '2036-04-10 00:00:00'")
        wait_systemctl_timer("2036-04-10 02:40")
        self.assertIn("2036-04-10 02:40", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-04-10 03:00:00'")
        wait_systemctl_timer("2036-04-10 21:15")
        self.assertIn("2036-04-10 21:15", m.execute("systemctl list-timers"))
        # creates a new houry timer that runs at *:05 and at *:26
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "hourly_timer")
        b.set_input_text("#description", "Hourly timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.select_from_dropdown("#drop-repeat", "hourly")
        b.click("[data-index='0'] [aria-label='Add']")
        b.set_input_text("[data-index='0'] input", "05")
        b.set_input_text("[data-index='1'] input", "26")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('hourly_timer.timer'))

        m.execute("timedatectl set-time '2036-04-10 03:00:00'")
        b.wait_visible(self.svc_sel('hourly_timer.timer'))
        b.wait_in_text(self.svc_sel('hourly_timer.timer'), "Hourly timer")
        wait_systemctl_timer("2036-04-10 03:05")
        self.assertIn("2036-04-10 03:05", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-04-10 03:07:00'")
        wait_systemctl_timer("2036-04-10 03:26")
        self.assertIn("2036-04-10 03:26", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-04-10 04:00:00'")
        wait_systemctl_timer("2036-04-10 04:05")
        # checks if timer runs on next hour at 5 min and 26 min
        self.assertIn("2036-04-10 04:05", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-04-10 04:10:00'")
        wait_systemctl_timer("2036-04-10 04:26")
        self.assertIn("2036-04-10 04:26", m.execute("systemctl list-timers"))

        # creates a new minutely timer that runs at *:*:05 and at *:*:20
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "minutely_timer")
        b.set_input_text("#description", "Minutely timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.select_from_dropdown("#drop-repeat", "minutely")
        b.click("[data-index='0'] [aria-label='Add']")
        b.set_input_text("[data-index='0'] input", "05")
        b.set_input_text("[data-index='1'] input", "20")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('minutely_timer.timer'))

        m.execute("timedatectl set-time '2036-04-10 04:15:07'")
        b.wait_visible(self.svc_sel('minutely_timer.timer'))
        b.wait_in_text(self.svc_sel('minutely_timer.timer'), "Minutely timer")
        wait_systemctl_timer("2036-04-10 04:15:20")
        self.assertIn("2036-04-10 04:15:20", m.execute("systemctl list-timers | grep minutely_timer.timer"))
        m.execute("timedatectl set-time '2036-04-10 04:15:55'")
        wait_systemctl_timer("2036-04-10 04:16:05")
        self.assertIn("2036-04-10 04:16:05", m.execute("systemctl list-timers | grep minutely_timer.timer"))

        # creates a new timer that runs at today at 23:59
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "no_repeat_timer")
        b.set_input_text("#description", "No repeat timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.set_input_text(".create-timer-time-picker input", "23:59")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('no_repeat_timer.timer'))
        b.wait_in_text(self.svc_sel('no_repeat_timer.timer'), "No repeat timer")

        m.execute("timedatectl set-time '2036-04-10 04:10:00'")
        b.wait_visible(self.svc_sel('no_repeat_timer.timer'))
        wait_systemctl_timer("2036-04-10 23:59")
        self.assertIn("2036-04-10 23:59", m.execute("systemctl list-timers"))

        # creates a boot timer that runs after 10 sec from boot
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "boot_timer")
        b.set_input_text("#description", "Boot timer")
        b.set_input_text("#command", "/bin/sh -c 'echo hello >> /tmp/hello'")
        b.click("input[value=system-boot]")
        b.set_input_text(".delay-group input", "2")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('boot_timer.timer'))
        self.reboot()
        m.start_cockpit()
        with testvm.Timeout(seconds=15, machine=m, error_message="Timeout while waiting for boot timer to run"):
            m.execute("while [ ! -f /tmp/hello ] ; do sleep 0.5; done")
        self.assertIn("hello", m.execute("cat /tmp/hello"))


if __name__ == '__main__':
    testlib.test_main()
